import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}

//Poll方式：启动spark-streaming应用程序，通过sinks的port端口为8888 去flume所在机器拉取数据
object SparkStreamingPollFlume
{
  //currentValues：当前批次汇总成的(word,1)中相同单词的所有的1，即汇总当前批次每个单词出现的所有的1，如(hadoop,1) (hadoop,1)(hadoop,1)
  //historyValues：历史的所有相同key的value总和，即在之前所有批次中所汇总的每个单词出现的总次数，如(hadoop,3)
  def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] =
  {
    //Option类型变量调用getOrElse(0)的用法：如果获取不出该变量的值，则返回值默认值0
    val newValue: Int = currentValues.sum + historyValues.getOrElse(0)
    //{}中的最后一行Some(返回值)作为当前函数的返回值，要求函数返回类型Option类型，而Some为Option的子类。
    //当函数的返回类型是Option类型时，那么如果没有返回值时使用None，None是Option的子类，相当于java的null；
    //那么如果有值返回时，就使用Some来包含这个值，Some也是Option的子类。
    Some(newValue)
  }

  def main(args: Array[String]): Unit =
  {
    //1、创建sparkConf
    //setMaster("local[2]")：本地模式运行，启动两个线程
    //设置master的地址local[N] ,n必须大于1，其中1个线程负责去接受数据，另一线程负责处理接受到的数据
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingPollFlume").setMaster("local[2]")
    //2、创建sparkContext
    val sc = new SparkContext(sparkConf)
    //设置日志输出的级别
    sc.setLogLevel("WARN")
    //3、构建StreamingContext对象，每个批处理的时间间隔，即以多少秒内的数据划分为一个批次 ，当前设置 以5秒内的数据 划分为一个批次，
    //   每一个batch(批次)就构成一个RDD数据集。DStream就是一个个batch(批次)的有序序列，时间是连续的，
    //   按照时间间隔将数据流分割成一个个离散的RDD数据集。
    val ssc = new StreamingContext(sc, Seconds(5))
    //使用了updateStateByKey方法，就必须设置checkpoint目录，用于缓存中间结果，即把所有批次的结果都先缓存在checkpoint目录中
    //设置checkpoint路径，当前项目下有一个flume目录
    ssc.checkpoint("./flume")
    //4、通过FlumeUtils调用createPollingStream方法获取flume中的数据
    // 数据采集方案flume-poll.conf配置文件中配置的sinks的port端口为8888，那么该端口为从flume中取数据的入口
    val pollingStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc,"192.168.25.100",8888)
    //5、获取flume中event事件的body内容：{"headers":xxxxxx,"body":xxxxx}
    val data: DStream[String] = pollingStream.map(x => new String(x.event.getBody.array()))
    //6、切分每一行，每个单词计为1
    //flatMap(_.split(" ")) 流的扁平化，最终输出的数据类型为一维数组Array[String]，所有单词都被分割出来作为一个元素存储到同一个一维数组Array[String]
    //map((_,1))每个单词记为1，即(单词，1)，表示每个单词封装为一个元祖，其key为单词，value为1，返回MapPartitionRDD数据
    val wordAndOne: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1))
    //7、累计统计相同单词出现的次数
    // 通过updateStateByKey(function函数名)实现所有批次的结果数据进行累加：传入实现累加计数的函数名
    //updateStateByKey(func)：根据key的之前状态值和key的新值，对key进行更新，返回一个新状态的DStream
    val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    //8、打印输出
    result.print()
    //9、开启流式计算
    ssc.start()
    ssc.awaitTermination()
  }
}
